19#include <zypp-media/MediaException>
20#include <zypp-media/auth/CredentialManager>
31 return (
_request->code () == ProvideMessage::Code::Attach );
38 return ( _request->code () == ProvideMessage::Code::Prov );
45 return ( _request->code () == ProvideMessage::Code::Detach );
55 DBG <<
"Queue shutdown with Items still running" << std::endl;
64 ERR <<
"Queue Worker was already initialized" << std::endl;
71 MIL <<
"Trying to start " <<
pN << std::endl;
73 if ( !
pi.isExist() ) {
78 if ( !
pi.userMayX() ) {
79 ERR <<
"Failed to start worker for " <<
workerScheme <<
" binary " <<
pi.asString() <<
" is not executable." << std::endl;
110 if (
i.isDetachRequest () )
112 return i._request.get() ==
item;
118 if (
item->code() != ProvideMessage::Code::Attach
119 &&
item->code() != ProvideMessage::Code::Prov ) {
120 ERR <<
"Can not cancel a " <<
item->code() <<
" request!" << std::endl;
126 reqRef->setCurrentQueue(
nullptr);
142 it->_request->setCurrentQueue(
nullptr );
194 reqRef->setCurrentQueue(
nullptr);
203 ERR <<
"Failed to send cancel message to worker" << std::endl;
206 reqRef->setCurrentQueue(
nullptr);
223 if ( !
reqRef->activeUrl() ) {
224 ERR <<
"Item without active URL enqueued, this is a BUG." << std::endl;
231 ERR <<
"Failed to send message to worker process." << std::endl;
244 _idleSince = std::chrono::steady_clock::now();
283 if (
i.isDetachRequest () )
287 if (
reqRef->code() != ProvideMessage::Code::Prov )
292 if (
i.isDetachRequest () )
295 if (
reqRef->code() != ProvideMessage::Code::Prov )
325 ERR <<
"Failed to execute worker" << std::endl;
353 ERR <<
"Failed to send initial message to queue worker" << std::endl;
363 ERR <<
"Worker did not sent a capabilities message, aborting" << std::endl;
391 ERR <<
"Ignoring invalid request!" << std::endl;
396 if ( ! elem._request )
398 return exp->requestId() == elem._request->provideMessage().requestId();
402 ERR <<
"Ignoring unknown request ID: " <<
exp->requestId() << std::endl;
412 ERR <<
"Failed to send Error message to worker process." << std::endl;
437 MIL <<
"Received a ProvideFinished message for a non existant request. Since this worker reported to create file artifacts, the file is cleaned up." << std::endl;
445 auto &
reqRef =req._request;
447 const auto code =
provMsg->code();
449 if ( code >= ProvideMessage::Code::FirstInformalCode && code <= ProvideMessage::Code::LastInformalCode ) {
456 }
else if ( code >= ProvideMessage::Code::FirstSuccessCode && code <= ProvideMessage::Code::LastSuccessCode ) {
464 if ( code == ProvideMessage::Code::ProvideFinished ) {
470 std::optional<zypp::ManagedFile> dataRef;
472 if ( !
reqIter->isFileRequest() ) {
473 ERR <<
"Invalid message for request ID: " <<
reqIter->_request->provideMessage().requestId() << std::endl;
487 MIL <<
"CACHE MISS, file " <<
locFName <<
" was already removed, queueing again" << std::endl;
503 reqRef->setCurrentQueue(
nullptr);
515 reqRef->setCurrentQueue(
nullptr);
522 }
else if ( code >= ProvideMessage::Code::FirstClientErrCode && code <= ProvideMessage::Code::LastSrvErrCode ) {
531 reqRef->setCurrentQueue(
nullptr);
540 }
else if ( code >= ProvideMessage::Code::FirstRedirCode && code <= ProvideMessage::Code::LastRedirCode ) {
550 reqRef->setCurrentQueue(
nullptr);
557 }
else if ( code >= ProvideMessage::Code::FirstControllerCode && code <= ProvideMessage::Code::LastControllerCode ) {
559 ERR <<
"Received Controller message from worker, this is a fatal error. Cancelling all requests!" << std::endl;
563 }
else if ( code >= ProvideMessage::Code::FirstWorkerCode && code <= ProvideMessage::Code::LastWorkerCode ) {
565 if ( code == ProvideMessage::Code::AuthDataRequest ) {
567 ERR <<
"Invalid message for request ID: " <<
reqRef->provideMessage().requestId() << std::endl;
574 if ( !
sendErrorToWorker(
reqRef->provideMessage().requestId(), ProvideMessage::Code::NoAuthData,
"Item was cancelled") )
581 if ( !
sendErrorToWorker(
reqRef->provideMessage().requestId(), ProvideMessage::Code::NoAuthData,
"Request has no owner" ) )
586 if ( !
reqRef->activeUrl() ) {
587 if ( !
sendErrorToWorker(
reqRef->provideMessage().requestId(), ProvideMessage::Code::NoAuthData,
"Item has no active URL, this is a bug." ) )
595 std::map<std::string, std::string>
extraVals;
602 if ( !
hdr.second.isString() ) {
603 WAR <<
"Ignoring non string value for " <<
hdr.first << std::endl;
612 if ( !
sendErrorToWorker(
reqRef->provideMessage().requestId(), ProvideMessage::Code::NoAuthData,
"No auth given by user." ) )
619 ERR <<
"Failed to send AuthorizationInfo to worker process." << std::endl;
632 }
else if ( code == ProvideMessage::Code::MediaChangeRequest ) {
634 if ( !
reqIter->isAttachRequest() ) {
635 ERR <<
"Invalid message for request ID: " <<
reqIter->_request->provideMessage().requestId() << std::endl;
642 if ( !
sendErrorToWorker(
reqRef->provideMessage().requestId(), ProvideMessage::Code::MediaChangeAbort,
"Item was cancelled" ) )
647 MIL <<
"Worker sent a MediaChangeRequest, asking the user to insert the correct medium" << std::endl;
652 freeDevs.push_back( val.asString() );
655 std::optional<std::string>
desc;
671 MIL <<
"Sending back a MediaChanged message, retrying to find medium " << std::endl;
674 ERR <<
"Failed to send MediaChanged to worker process." << std::endl;
681 MIL <<
"Sending back a MediaChangeFailure message, request will fail " << std::endl;
682 if ( !
sendErrorToWorker(
reqRef->provideMessage().requestId(), ProvideMessage::Code::MediaChangeAbort,
"Cancelled by User" ) )
687 MIL <<
"Sending back a MediaChangeFailure message, request will fail " << std::endl;
688 if ( !
sendErrorToWorker(
reqRef->provideMessage().requestId(), ProvideMessage::Code::MediaChangeSkip,
"Skipped by User" ) )
695 ERR <<
"Unsupported worker request: "<<code<<
", this is a fatal error!" << std::endl;
702 ERR <<
"Received unsupported message " << msg->command() <<
" with code " << code <<
" ignoring! " << std::endl;
706 ERR <<
"Received unsupported message " << msg->command() <<
"ignoring" << std::endl;
719 while ( !
ba.empty() ) {
771 MIL <<
"Unexpected queue worker exit with code: " << exitCode << std::endl;
Reference counted access to a Tp object calling a custom Dispose function when the last AutoDispose h...
void reset()
Reset to default Ctor values.
Store and operate with byte count.
Base class for Exception.
static LogControl instance()
Singleton access.
Wrapper class for stat/lstat.
const char * c_str() const
String representation.
const std::string & asString() const
String representation.
bool empty() const
Test for an empty path.
static auto connect(typename internal::MemberFunction< SenderFunc >::ClassType &s, SenderFunc &&sFun, typename internal::MemberFunction< ReceiverFunc >::ClassType &recv, ReceiverFunc &&rFunc)
SignalProxy< void(uint)> sigChannelReadyRead()
SignalProxy< void(int)> sigFinished()
static ProvideMessage createAuthInfo(const uint32_t reqId, const std::string &user, const std::string &pw, int64_t timestamp, const std::map< std::string, std::string > &extraValues={})
static ProvideMessage createCancel(const uint32_t reqId)
static constexpr std::string_view typeName
static ProvideMessage createMediaChanged(const uint32_t reqId)
static ProvideMessage createErrorResponse(const uint32_t reqId, const Code code, const std::string &reason, bool transient=false)
static expected< ProvideMessage > create(const zypp::PluginFrame &message)
const std::string queueName(ProvideQueue &q) const
std::optional< zypp::ManagedFile > addToFileCache(const zypp::Pathname &downloadedFile)
Signal< Provide::MediaChangeAction(const std::string &, const std::string &, const int32_t, const std::vector< std::string > &, const std::optional< std::string > &) _sigMediaChange)
bool isInCache(const zypp::Pathname &downloadedFile) const
void schedule(ScheduleReason reason)
const zypp::Pathname & workerPath() const
std::list< ProvideQueue::Item >::iterator cancelActiveItem(std::list< Item >::iterator i, const std::exception_ptr &error)
zypp::ByteCount expectedProvideSize() const
ProvideQueue(ProvidePrivate &parent)
StompFrameStreamRef _messageStream
void immediateShutdown(const std::exception_ptr &reason)
std::deque< Item > _waitQueue
void cancel(ProvideRequest *item, std::exception_ptr error)
uint requestCount() const
zypp::Pathname _currentExe
bool canScheduleMore() const
void processReadyRead(int channel)
const std::string & hostname() const
std::optional< TimePoint > _idleSince
bool startup(const std::string &workerScheme, const zypp::Pathname &workDir, const std::string &hostname="")
const Config & workerConfig() const
std::optional< TimePoint > idleSince() const
void procFinished(int exitCode)
uint activeRequests() const
std::list< Item > _activeItems
void forwardToLog(std::string &&logLine)
void enqueue(ProvideRequestRef request)
SignalProxy< void()> sigIdle()
std::list< ProvideQueue::Item >::iterator dequeueActive(std::list< Item >::iterator it)
void fatalWorkerError(const std::exception_ptr &reason=nullptr)
static constexpr uint32_t InvalidId
SignalProxy< void()> sigMessageReceived()
static Ptr create(IODevice::Ptr iostr)
const std::string & worker_name() const
WorkerType worker_type() const
static constexpr std::string_view typeName
int unlink(const Pathname &path)
Like 'unlink'.
int assert_dir(const Pathname &path, unsigned mode)
Like 'mkdir -p'.
constexpr std::string_view EffectiveUrl("effective_url")
constexpr std::string_view LastAuthTimestamp("last_auth_timestamp")
constexpr std::string_view LocalFilename("local_filename")
constexpr std::string_view CacheHit("cacheHit")
constexpr std::string_view ExpectedFilesize("expected_filesize")
constexpr std::string_view CheckExistOnly("check_existance_only")
constexpr std::string_view ATTACH_POINT("zconfig://media/AttachPoint")
bool provideDebugEnabled()
constexpr std::string_view PROVIDER_ROOT("zconfig://media/ProviderRoot")
constexpr std::string_view AGENT_STRING_CONF("zconfig://media/UserAgent")
bool isDetachRequest() const
ProvideRequestRef _request
bool isAttachRequest() const
bool isFileRequest() const
Provides API related macros.
#define ZYPP_CAUGHT(EXCPT)
Drops a logline telling the Exception was caught (in order to handle it).
#define ZYPP_EXCPT_PTR(EXCPT)
Drops a logline and returns Exception as a std::exception_ptr.